# -*- coding: utf-8 -*-

# Import python libs
import logging

# Import Salt Testing libs
from salttesting import skipIf
from salttesting.helpers import (
    destructiveTest,
    ensure_in_syspath
)
ensure_in_syspath('../../')

# Import salt libs
import integration
from salt.modules import mysql as mysqlmod

log = logging.getLogger(__name__)

NO_MYSQL = False
try:
    import MySQLdb  # pylint: disable=W0611
except Exception:
    NO_MYSQL = True


@skipIf(
    NO_MYSQL,
    'Please install MySQL bindings and a MySQL Server before running'
    'MySQL integration tests.'
)
class MysqlModuleDbTest(integration.ModuleCase,
                        integration.SaltReturnAssertsMixIn):
    '''
    Module testing database creation on a real MySQL Server.
    '''

    user = 'root'
    password = 'poney'

    @destructiveTest
    def setUp(self):
        '''
        Test presence of MySQL server, enforce a root password
        '''
        super(MysqlModuleDbTest, self).setUp()
        NO_MYSQL_SERVER = True
        # now ensure we know the mysql root password
        # one of theses two at least should work
        ret1 = self.run_state(
            'cmd.run',
             name='mysqladmin --host="localhost" -u '
               + self.user
               + ' flush-privileges password "'
               + self.password
               + '"'
        )
        ret2 = self.run_state(
            'cmd.run',
             name='mysqladmin --host="localhost" -u '
               + self.user
               + ' --password="'
               + self.password
               + '" flush-privileges password "'
               + self.password
               + '"'
        )
        key, value = ret2.popitem()
        if value['result']:
            NO_MYSQL_SERVER = False
        else:
            self.skipTest('No MySQL Server running, or no root access on it.')

    def _db_creation_loop(self,
                          db_name,
                          returning_name,
                          test_conn=False,
                          **kwargs):
        '''
        Used in db testCase, create, check exists, check in list and removes.
        '''
        ret = self.run_function(
            'mysql.db_create',
            name=db_name,
            **kwargs
        )
        self.assertEqual(
            True,
            ret,
            'Problem while creating db for db name: {0!r}'.format(db_name)
        )
        # test db exists
        ret = self.run_function(
            'mysql.db_exists',
            name=db_name,
            **kwargs
        )
        self.assertEqual(
            True,
            ret,
            'Problem while testing db exists for db name: {0!r}'.format(db_name)
        )
        # List db names to ensure db is created with the right utf8 string
        ret = self.run_function(
            'mysql.db_list',
            **kwargs
        )
        if not isinstance(ret, list):
            raise AssertionError(
                ('Unexpected query result while retrieving databases list'
                 ' {0!r} for {1!r} test').format(
                         ret,
                         db_name
                    )
                )
        self.assertIn(
            returning_name,
            ret,
            ('Problem while testing presence of db name in db lists'
             ' for db name: {0!r} in list {1!r}').format(
                          db_name,
                          ret
        ))

        if test_conn:
            # test connections on database with root user
            ret = self.run_function(
                'mysql.query',
                database=db_name,
                query='SELECT 1',
                **kwargs
            )
            if not isinstance(ret, dict) or 'results' not in ret:
                raise AssertionError(
                    ('Unexpected result while testing connection'
                    ' on database : {0}').format(
                        repr(db_name)
                    )
                )
            self.assertEqual([['1']], ret['results'])

        # Now remove database
        ret = self.run_function(
            'mysql.db_remove',
            name=db_name,
            **kwargs
        )
        self.assertEqual(
            True,
            ret,
            'Problem while removing db for db name: {0!r}'.format(db_name)
        )

    @destructiveTest
    def test_database_creation_level1(self):
        '''
        Create database, test presence, then drop db. All theses with complex names.
        '''
        # name with space
        db_name = 'foo 1'
        self._db_creation_loop(db_name=db_name,
                               returning_name=db_name,
                               test_conn=True,
                               connection_user=self.user,
                               connection_pass=self.password
        )

        # ```````
        # create
        # also with character_set and collate only
        ret = self.run_function(
          'mysql.db_create',
          name='foo`2',
          character_set='utf8',
          collate='utf8_general_ci',
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)
        # test db exists
        ret = self.run_function(
          'mysql.db_exists',
          name='foo`2',
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)
        # redoing the same should fail
        # even with other character sets or collations
        ret = self.run_function(
          'mysql.db_create',
          name='foo`2',
          character_set='utf8',
          collate='utf8_general_ci',
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(False, ret)
        # redoing the same should fail
        ret = self.run_function(
          'mysql.db_create',
          name='foo`2',
          character_set='utf8',
          collate='utf8_general_ci',
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(False, ret)
        # Now remove database
        ret = self.run_function(
          'mysql.db_remove',
          name='foo`2',
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)

        # '''''''
        # create
        # also with character_set only
        db_name = "foo'3"
        self._db_creation_loop(db_name=db_name,
                               returning_name=db_name,
                               test_conn=True,
                               character_set='utf8',
                               connection_user=self.user,
                               connection_pass=self.password
        )

        # """"""""
        # also with collate only
        db_name = 'foo"4'
        self._db_creation_loop(db_name=db_name,
                               returning_name=db_name,
                               test_conn=True,
                               collate='utf8_general_ci',
                               connection_user=self.user,
                               connection_pass=self.password
        )
        # fuzzy
        db_name = '<foo` --"5>'
        self._db_creation_loop(db_name=db_name,
                               returning_name=db_name,
                               test_conn=True,
                               connection_user=self.user,
                               connection_pass=self.password
        )

    @destructiveTest
    def test_mysql_dbname_character_percent(self):
        '''
        Play with the '%' character problems

        This character should be escaped in the form '%%' on queries, but only
        when theses queries have arguments. It is also a special character
        in LIKE SQL queries. Finally it is used to indicate query arguments.
        '''
        db_name1 = "foo%1_"
        db_name2 = "foo%12"
        ret = self.run_function(
          'mysql.db_create',
          name=db_name1,
          character_set='utf8',
          collate='utf8_general_ci',
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)
        ret = self.run_function(
          'mysql.db_create',
          name=db_name2,
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)
        ret = self.run_function(
          'mysql.db_remove',
          name=db_name1,
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)
        ret = self.run_function(
            'mysql.db_exists',
            name=db_name1,
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(False, ret)
        ret = self.run_function(
            'mysql.db_exists',
            name=db_name2,
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(True, ret)
        ret = self.run_function(
          'mysql.db_remove',
          name=db_name2,
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)

    @destructiveTest
    def test_database_creation_utf8(self):
        '''
        Test support of utf8 in database names
        '''
        # Simple accents : using utf8 string
        db_name_unicode = u'notam\xe9rican'
        # same as 'notamérican' because of file encoding
        # but ensure it on this test
        db_name_utf8 = 'notam\xc3\xa9rican'
        # FIXME: MySQLdb problems on conn strings containing
        # utf-8 on user name of db name prevent conn test
        self._db_creation_loop(db_name=db_name_utf8,
                               returning_name=db_name_utf8,
                               test_conn=False,
                               connection_user=self.user,
                               connection_pass=self.password,
                               connection_charset='utf8',
                               saltenv={"LC_ALL": "en_US.utf8"}
        )
        # test unicode entry will also return utf8 name
        self._db_creation_loop(db_name=db_name_unicode,
                               returning_name=db_name_utf8,
                               test_conn=False,
                               connection_user=self.user,
                               connection_pass=self.password,
                               connection_charset='utf8',
                               saltenv={"LC_ALL": "en_US.utf8"}
        )
        # Using more complex unicode characters:
        db_name_unicode = u'\u6a19\u6e96\u8a9e'
        # same as '標準語' because of file encoding
        # but ensure it on this test
        db_name_utf8 = '\xe6\xa8\x99\xe6\xba\x96\xe8\xaa\x9e'
        self._db_creation_loop(db_name=db_name_utf8,
                               returning_name=db_name_utf8,
                               test_conn=False,
                               connection_user=self.user,
                               connection_pass=self.password,
                               connection_charset='utf8',
                               saltenv={"LC_ALL": "en_US.utf8"}
        )
        # test unicode entry will also return utf8 name
        self._db_creation_loop(db_name=db_name_unicode,
                               returning_name=db_name_utf8,
                               test_conn=False,
                               connection_user=self.user,
                               connection_pass=self.password,
                               connection_charset='utf8',
                               saltenv={"LC_ALL": "en_US.utf8"}
        )

    @destructiveTest
    def test_database_maintenance(self):
        '''
        Test maintenance operations on a created database
        '''
        dbname = u"foo%'-- `\"'"
        # create database
        # but first silently try to remove it
        # in case of previous tests failures
        ret = self.run_function(
          'mysql.db_remove',
          name=dbname,
          connection_user=self.user,
          connection_pass=self.password
        )
        ret = self.run_function(
          'mysql.db_create',
          name=dbname,
          character_set='utf8',
          collate='utf8_general_ci',
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)
        # test db exists
        ret = self.run_function(
          'mysql.db_exists',
          name=dbname,
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)
        # Create 3 tables
        tablenames = {'A%table "`1': 'MYISAM',
                      'B%table \'`2': 'InnoDB',
                      'Ctable --`3': 'MEMORY'
                     }
        for tablename, engine in iter(sorted(tablenames.iteritems())):
            # prepare queries
            create_query = ('CREATE TABLE {tblname} ('
                ' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
                ' data VARCHAR(100)) ENGINE={engine};'.format(
                    tblname=mysqlmod.quote_identifier(tablename),
                    engine=engine,
                ))
            insert_query = ('INSERT INTO {tblname} (data)'
                ' VALUES '.format(
                    tblname=mysqlmod.quote_identifier(tablename)
            ))
            delete_query = ('DELETE from  {tblname}'
                ' order by rand() limit 50;'.format(
                    tblname=mysqlmod.quote_identifier(tablename)
            ))
            for x in range(100):
                insert_query += "('foo"+str(x)+"'),"
            insert_query += "('bar');"

            # populate database
            log.info('Adding table {0!r}'.format(tablename,))
            ret = self.run_function(
              'mysql.query',
              database=dbname,
              query=create_query,
              connection_user=self.user,
              connection_pass=self.password
            )
            if not isinstance(ret, dict) or 'rows affected' not in ret:
                raise AssertionError(
                    ('Unexpected query result while populating test table'
                     ' {0!r} : {1!r}').format(
                         tablename,
                         ret,
                    )
                )
            self.assertEqual(ret['rows affected'], 0)
            log.info('Populating table {0!r}'.format(tablename,))
            ret = self.run_function(
              'mysql.query',
              database=dbname,
              query=insert_query,
              connection_user=self.user,
              connection_pass=self.password
            )
            if not isinstance(ret, dict) or 'rows affected' not in ret:
                raise AssertionError(
                    ('Unexpected query result while populating test table'
                     ' {0!r} : {1!r}').format(
                         tablename,
                         ret,
                    )
                )
            self.assertEqual(ret['rows affected'], 101)
            log.info('Removing some rows on table{0!r}'.format(tablename,))
            ret = self.run_function(
              'mysql.query',
              database=dbname,
              query=delete_query,
              connection_user=self.user,
              connection_pass=self.password
            )
            if not isinstance(ret, dict) or 'rows affected' not in ret:
                raise AssertionError(
                    ('Unexpected query result while removing rows on test table'
                     ' {0!r} : {1!r}').format(
                         tablename,
                         ret,
                    )
                )
            self.assertEqual(ret['rows affected'], 50)
        # test check/repair/opimize on 1 table
        tablename = 'A%table "`1'
        ret = self.run_function(
          'mysql.db_check',
          name=dbname,
          table=tablename,
          connection_user=self.user,
          connection_pass=self.password
        )
        # Note that returned result does not quote_identifier of table and db
        self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
                                'Msg_text': 'OK',
                                'Msg_type': 'status',
                                'Op': 'check'}]
        )
        ret = self.run_function(
          'mysql.db_repair',
          name=dbname,
          table=tablename,
          connection_user=self.user,
          connection_pass=self.password
        )
        # Note that returned result does not quote_identifier of table and db
        self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
                                'Msg_text': 'OK',
                                'Msg_type': 'status',
                                'Op': 'repair'}]
        )
        ret = self.run_function(
          'mysql.db_optimize',
          name=dbname,
          table=tablename,
          connection_user=self.user,
          connection_pass=self.password
        )
        # Note that returned result does not quote_identifier of table and db
        self.assertEqual(ret, [{'Table': dbname+'.'+tablename,
                                'Msg_text': 'OK',
                                'Msg_type': 'status',
                                'Op': 'optimize'}]
        )

        # test check/repair/opimize on all tables
        ret = self.run_function(
          'mysql.db_check',
          name=dbname,
          connection_user=self.user,
          connection_pass=self.password
        )
        expected = []
        for tablename, engine in iter(sorted(tablenames.iteritems())):
            if engine is 'MEMORY':
                expected.append([{
                    'Table': dbname+'.'+tablename,
                    'Msg_text': ("The storage engine for the table doesn't"
                                 " support check"),
                    'Msg_type': 'note',
                    'Op': 'check'
                }])
            else:
                expected.append([{
                    'Table': dbname+'.'+tablename,
                    'Msg_text': 'OK',
                    'Msg_type': 'status',
                    'Op': 'check'
                }])
        self.assertEqual(ret, expected)

        ret = self.run_function(
          'mysql.db_repair',
          name=dbname,
          connection_user=self.user,
          connection_pass=self.password
        )
        expected = []
        for tablename, engine in iter(sorted(tablenames.iteritems())):
            if engine is 'MYISAM':
                expected.append([{
                    'Table': dbname+'.'+tablename,
                    'Msg_text': 'OK',
                    'Msg_type': 'status',
                    'Op': 'repair'
                }])
            else:
                expected.append([{
                    'Table': dbname+'.'+tablename,
                    'Msg_text': ("The storage engine for the table doesn't"
                                 " support repair"),
                    'Msg_type': 'note',
                    'Op': 'repair'
                }])
        self.assertEqual(ret, expected)

        ret = self.run_function(
          'mysql.db_optimize',
          name=dbname,
          connection_user=self.user,
          connection_pass=self.password
        )

        expected = []
        for tablename, engine in iter(sorted(tablenames.iteritems())):
            if engine is 'MYISAM':
                expected.append([{
                    'Table': dbname+'.'+tablename,
                    'Msg_text': 'OK',
                    'Msg_type': 'status',
                    'Op': 'optimize'
                }])
            elif engine is 'InnoDB':
                expected.append([{
                    'Table': dbname+'.'+tablename,
                    'Msg_text': ("Table does not support optimize, "
                                 "doing recreate + analyze instead"),
                    'Msg_type': 'note',
                    'Op': 'optimize'
                },
                {
                    'Table': dbname+'.'+tablename,
                    'Msg_text': 'OK',
                    'Msg_type': 'status',
                    'Op': 'optimize'
                }])
            elif engine is 'MEMORY':
                expected.append([{
                    'Table': dbname+'.'+tablename,
                    'Msg_text': ("The storage engine for the table doesn't"
                                 " support optimize"),
                    'Msg_type': 'note',
                    'Op': 'optimize'
                }])
        self.assertEqual(ret, expected)
        # Teardown, remove database
        ret = self.run_function(
          'mysql.db_remove',
          name=dbname,
          connection_user=self.user,
          connection_pass=self.password
        )
        self.assertEqual(True, ret)


@skipIf(
    NO_MYSQL,
    'Please install MySQL bindings and a MySQL Server before running'
    'MySQL integration tests.'
)
class MysqlModuleUserTest(integration.ModuleCase,
                      integration.SaltReturnAssertsMixIn):
    '''
    User Creation and connection tests
    '''

    user = 'root'
    password = 'poney'

    @destructiveTest
    def setUp(self):
        '''
        Test presence of MySQL server, enforce a root password
        '''
        super(MysqlModuleUserTest, self).setUp()
        NO_MYSQL_SERVER = True
        # now ensure we know the mysql root password
        # one of theses two at least should work
        ret1 = self.run_state(
            'cmd.run',
             name='mysqladmin --host="localhost" -u '
               + self.user
               + ' flush-privileges password "'
               + self.password
               + '"'
        )
        ret2 = self.run_state(
            'cmd.run',
             name='mysqladmin --host="localhost" -u '
               + self.user
               + ' --password="'
               + self.password
               + '" flush-privileges password "'
               + self.password
               + '"'
        )
        key, value = ret2.popitem()
        if value['result']:
            NO_MYSQL_SERVER = False
        else:
            self.skipTest('No MySQL Server running, or no root access on it.')

    def _userCreationLoop(self,
                         uname,
                         host,
                         password=None,
                         new_password=None,
                         new_password_hash=None,
                         **kwargs):
        '''
        Perform some tests around creation of the given user
        '''
        # First silently remove it, in case of
        ret = self.run_function(
            'mysql.user_remove',
            user=uname,
            host=host,
            **kwargs
        )
        # creation
        ret = self.run_function(
            'mysql.user_create',
            user=uname,
            host=host,
            password=password,
            **kwargs
        )
        self.assertEqual(True, ret, ('Calling user_create on'
            ' user {0!r} did not return True: {1}').format(
            uname,
            repr(ret)
        ))
        # double creation failure
        ret = self.run_function(
            'mysql.user_create',
            user=uname,
            host=host,
            password=password,
            **kwargs
        )
        self.assertEqual(False, ret, ('Calling user_create a second time on'
            ' user {0!r} did not return False: {1}').format(
            uname,
            repr(ret)
        ))
        # Alter password
        if new_password is not None or new_password_hash is not None:
            ret = self.run_function(
                'mysql.user_chpass',
                user=uname,
                host=host,
                password=new_password,
                password_hash=new_password_hash,
                connection_user=self.user,
                connection_pass=self.password,
                connection_charset='utf8',
                saltenv={"LC_ALL": "en_US.utf8"}
            )
            self.assertEqual(True, ret, ('Calling user_chpass on'
                ' user {0!r} did not return True: {1}').format(
                uname,
                repr(ret)
            ))

    def _chck_userinfo(self, user, host, check_user, check_hash):
        '''
        Internal routine to check user_info returned results
        '''
        ret = self.run_function(
            'mysql.user_info',
            user=user,
            host=host,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        if not isinstance(ret, dict):
            raise AssertionError(
                'Unexpected result while retrieving user_info for {0!r}'.format(
                    user
                )
            )
        self.assertEqual(ret['Host'], host)
        self.assertEqual(ret['Password'], check_hash)
        self.assertEqual(ret['User'], check_user)

    def _chk_remove_user(self, user, host, **kwargs):
        '''
        Internal routine to check user_remove
        '''
        ret = self.run_function(
            'mysql.user_remove',
            user=user,
            host=host,
            **kwargs
        )
        self.assertEqual(True, ret, ('Assertion failed  while removing user'
            ' {0!r} on host {1!r}: {2}').format(
            user,
            host,
            repr(ret)
        ))

    @destructiveTest
    def test_user_management(self):
        '''
        Test various users creation settings
        '''
        # Create users with rights on this database
        # and rights on other databases
        user1 = "user '1"
        user1_pwd = 'pwd`\'"1b'
        user1_pwd_hash = '*4DF33B3B12E43384677050A818327877FAB2F4BA'
        # this is : user "2'標
        user2 = 'user "2\'\xe6\xa8\x99'
        user2_pwd = 'user "2\'\xe6\xa8\x99b'
        user2_pwd_hash = '*3A38A7B94B024B983687BB9B44FB60B7AA38FE61'
        user3 = 'user "3;,?:@=&/'
        user3_pwd = 'user "3;,?:@=&/'
        user3_pwd_hash = '*AA3B1D4105A45D381C23A5C221C47EA349E1FD7D'
        # this is : user ":=;4標 in unicode instead of utf-8
        # if unicode char is counted as 1 char we hit the max user
        # size (16)
        user4 = u'user":;,?:@=&/4\u6a19'
        user4_utf8 = 'user":;,?:@=&/4\xe6\xa8\x99'
        user4_pwd = 'user "4;,?:@=&/'
        user4_pwd_hash = '*FC8EF8DBF27628E4E113359F8E7478D5CF3DD57C'
        user5 = u'user ``"5'
        user5_utf8 = 'user ``"5'
        # this is 標標標\
        user5_pwd = '\xe6\xa8\x99\xe6\xa8\x99\\'
        # this is password('標標\\')
        user5_pwd_hash = '*3752E65CDD8751AF8D889C62CFFC6C998B12C376'
        user6 = u'user %--"6'
        user6_utf8 = 'user %--"6'
        # this is : --'"% SIX標b
        user6_pwd_u = u' --\'"% SIX\u6a19b'
        user6_pwd_utf8 = ' --\'"% SIX\xe6\xa8\x99b'
        # this is password(' --\'"% SIX標b')
        user6_pwd_hash = '*90AE800593E2D407CD9E28CCAFBE42D17EEA5369'
        self._userCreationLoop(
            uname=user1,
            host='localhost',
            password='pwd`\'"1',
            new_password='pwd`\'"1b',
            connection_user=self.user,
            connection_pass=self.password
        )
        # Now check for results
        ret = self.run_function(
            'mysql.user_exists',
            user=user1,
            host='localhost',
            password=user1_pwd,
            password_hash=None,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' existence failed').format(user1, 'localhost')
        )

        self._userCreationLoop(
            uname=user2,
            host='localhost',
            password=None,
            # this is his name hash : user "2'標
            password_hash='*EEF6F854748ACF841226BB1C2422BEC70AE7F1FF',
            # and this is the same with a 'b' added
            new_password_hash=user2_pwd_hash,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        # user2 can connect from other places with other password
        self._userCreationLoop(
            uname=user2,
            host='10.0.0.1',
            allow_passwordless=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self._userCreationLoop(
            uname=user2,
            host='10.0.0.2',
            allow_passwordless=True,
            unix_socket=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        # Now check for results
        ret = self.run_function(
            'mysql.user_exists',
            user=user2,
            host='localhost',
            password=None,
            password_hash=user2_pwd_hash,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' failed').format(user2, 'localhost')
        )
        ret = self.run_function(
            'mysql.user_exists',
            user=user2,
            host='10.0.0.1',
            allow_passwordless=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' without password failed').format(user2, '10.0.0.1')
        )
        ret = self.run_function(
            'mysql.user_exists',
            user=user2,
            host='10.0.0.2',
            allow_passwordless=True,
            unix_socket=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' without password failed').format(user2, '10.0.0.2')
        )

        # Empty password is not passwordless (or is it a bug?)
        self._userCreationLoop(
            uname=user3,
            host='localhost',
            password='',
            connection_user=self.user,
            connection_pass=self.password
        )
        # user 3 on another host with a password
        self._userCreationLoop(
            uname=user3,
            host='%',
            password='foo',
            new_password=user3_pwd,
            connection_user=self.user,
            connection_pass=self.password
        )
        # Now check for results
        ret = self.run_function(
            'mysql.user_exists',
            user=user3,
            host='localhost',
            password='',
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' without empty password failed').format(user3, 'localhost')
        )
        ret = self.run_function(
            'mysql.user_exists',
            user=user3,
            host='%',
            password=user3_pwd,
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' with password failed').format(user3, '%')
        )

        # check unicode name, and password > password_hash
        self._userCreationLoop(
            uname=user4,
            host='%',
            password=user4_pwd,
            # this is password('foo')
            password_hash='*F3A2A51A9B0F2BE2468926B4132313728C250DBF',
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        # Now check for results
        ret = self.run_function(
            'mysql.user_exists',
            user=user4_utf8,
            host='%',
            password=user4_pwd,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' with password take from password and not password_hash'
            ' failed').format(user4_utf8, '%')
        )
        self._userCreationLoop(
            uname=user5,
            host='localhost',
            password='\xe6\xa8\x99\xe6\xa8\x99',
            new_password=user5_pwd,
            unix_socket=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        ret = self.run_function(
            'mysql.user_exists',
            user=user5_utf8,
            host='localhost',
            password=user5_pwd,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' with utf8 password failed').format(user5_utf8, 'localhost')
        )
        # for this one we give password in unicode and check it in utf-8
        self._userCreationLoop(
            uname=user6,
            host='10.0.0.1',
            password=' foobar',
            new_password=user6_pwd_u,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        # Now check for results
        ret = self.run_function(
            'mysql.user_exists',
            user=user6_utf8,
            host='10.0.0.1',
            password=user6_pwd_utf8,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertEqual(True, ret, ('Testing final user {0!r} on host {1!r}'
            ' with unicode password failed').format(user6_utf8, '10.0.0.1')
        )
        # Final result should be:
        # mysql> select Host, User, Password from user where user like 'user%';
        # +--------------------+-----------+-------------------------------+
        # | User               | Host      | Password                      |
        # +--------------------+-----------+-------------------------------+
        # | user "2'標         | 10.0.0.1  |                               |
        # | user "2'標         | 10.0.0.2  |                               |
        # | user "2'標         | localhost | *3A38A7B94B0(...)60B7AA38FE61 |
        # | user "3;,?:@=&/    | %         | *AA3B1D4105(...)47EA349E1FD7D |
        # | user "3;,?:@=&/    | localhost |                               |
        # | user %--"6         | 10.0.0.1  | *90AE800593(...)E42D17EEA5369 |
        # | user '1            | localhost | *4DF33B3B1(...)327877FAB2F4BA |
        # | user ``"5          | localhost | *3752E65CD(...)FC6C998B12C376 |
        # | user":;,?:@=&/4標  | %         | *FC8EF8DBF(...)7478D5CF3DD57C |
        # +--------------------+-----------+-------------------------------+
        self._chck_userinfo(user=user2,
                            host='10.0.0.1',
                            check_user=user2,
                            check_hash=''
        )
        self._chck_userinfo(user=user2,
                            host='10.0.0.2',
                            check_user=user2,
                            check_hash=''
        )
        self._chck_userinfo(user=user2,
                            host='localhost',
                            check_user=user2,
                            check_hash=user2_pwd_hash
        )
        self._chck_userinfo(user=user3,
                            host='%',
                            check_user=user3,
                            check_hash=user3_pwd_hash
        )
        self._chck_userinfo(user=user3,
                            host='localhost',
                            check_user=user3,
                            check_hash=''
        )
        self._chck_userinfo(user=user4,
                            host='%',
                            check_user=user4_utf8,
                            check_hash=user4_pwd_hash
        )
        self._chck_userinfo(user=user6,
                            host='10.0.0.1',
                            check_user=user6_utf8,
                            check_hash=user6_pwd_hash
        )
        self._chck_userinfo(user=user1,
                            host='localhost',
                            check_user=user1,
                            check_hash=user1_pwd_hash
        )
        self._chck_userinfo(user=user5,
                            host='localhost',
                            check_user=user5_utf8,
                            check_hash=user5_pwd_hash
        )
        # check user_list function
        ret = self.run_function(
            'mysql.user_list',
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertIn({'Host': 'localhost', 'User': user1}, ret)
        self.assertIn({'Host': 'localhost', 'User': user2}, ret)
        self.assertIn({'Host': '10.0.0.1', 'User': user2}, ret)
        self.assertIn({'Host': '10.0.0.2', 'User': user2}, ret)
        self.assertIn({'Host': '%', 'User': user3}, ret)
        self.assertIn({'Host': 'localhost', 'User': user3}, ret)
        self.assertIn({'Host': '%', 'User': user4_utf8}, ret)
        self.assertIn({'Host': 'localhost', 'User': user5_utf8}, ret)
        self.assertIn({'Host': '10.0.0.1', 'User': user6_utf8}, ret)

        # And finally, test connections on MySQL with theses users
        ret = self.run_function(
            'mysql.query',
            database='information_schema',
            query='SELECT 1',
            connection_user=user1,
            connection_pass='pwd`\'"1b',
            connection_host='localhost'
        )
        if not isinstance(ret, dict) or 'results' not in ret:
            raise AssertionError(
                ('Unexpected result while testing connection'
                ' with user {0!r}: {1}').format(
                    user1,
                    repr(ret)
                )
            )
        self.assertEqual([['1']], ret['results'])

        # FIXME: still failing, but works by hand...
        # mysql --user="user \"2'標" --password="user \"2'標b" information_schema
        # Seems to be a python-mysql library problem with user names containing
        # utf8 characters
        # @see https://github.com/farcepest/MySQLdb1/issues/40
        #import urllib
        #ret = self.run_function(
        #    'mysql.query',
        #    database='information_schema',
        #    query='SELECT 1',
        #    connection_user=urllib.quote_plus(user2),
        #    connection_pass=urllib.quote_plus(user2_pwd),
        #    connection_host='localhost',
        #    connection_charset='utf8',
        #    saltenv={"LC_ALL": "en_US.utf8"}
        #)
        #if not isinstance(ret, dict) or 'results' not in ret:
        #    raise AssertionError(
        #        ('Unexpected result while testing connection'
        #        ' with user {0!r}: {1}').format(
        #            user2,
        #            repr(ret)
        #        )
        #    )
        #self.assertEqual([['1']], ret['results'])
        ret = self.run_function(
            'mysql.query',
            database='information_schema',
            query='SELECT 1',
            connection_user=user3,
            connection_pass='',
            connection_host='localhost',
        )
        if not isinstance(ret, dict) or 'results' not in ret:
            raise AssertionError(
                ('Unexpected result while testing connection'
                ' with user {0!r}: {1}').format(
                    user3,
                    repr(ret)
                )
            )
        self.assertEqual([['1']], ret['results'])
        # FIXME: Failing
        #ret = self.run_function(
        #    'mysql.query',
        #    database='information_schema',
        #    query='SELECT 1',
        #    connection_user=user4_utf8,
        #    connection_pass=user4_pwd,
        #    connection_host='localhost',
        #    connection_charset='utf8',
        #    saltenv={"LC_ALL": "en_US.utf8"}
        #)
        #if not isinstance(ret, dict) or 'results' not in ret:
        #    raise AssertionError(
        #        ('Unexpected result while testing connection'
        #        ' with user {0!r}: {1}').format(
        #            user4_utf8,
        #            repr(ret)
        #        )
        #    )
        #self.assertEqual([['1']], ret['results'])
        ret = self.run_function(
            'mysql.query',
            database='information_schema',
            query='SELECT 1',
            connection_user=user5_utf8,
            connection_pass=user5_pwd,
            connection_host='localhost',
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        if not isinstance(ret, dict) or 'results' not in ret:
            raise AssertionError(
                ('Unexpected result while testing connection'
                ' with user {0!r}: {1}').format(
                    user5_utf8,
                   repr(ret)
                )
            )
        self.assertEqual([['1']], ret['results'])

        # Teardown by deleting with user_remove
        self._chk_remove_user(user=user2,
                              host='10.0.0.1',
                              connection_user=self.user,
                              connection_pass=self.password,
                              connection_charset='utf8',
                              saltenv={"LC_ALL": "en_US.utf8"}
        )
        self._chk_remove_user(user=user2,
                              host='10.0.0.2',
                              connection_user=self.user,
                              connection_pass=self.password,
                              connection_charset='utf8',
                              saltenv={"LC_ALL": "en_US.utf8"}
        )
        self._chk_remove_user(user=user2,
                              host='localhost',
                              connection_user=self.user,
                              connection_pass=self.password,
                              connection_charset='utf8',
                              saltenv={"LC_ALL": "en_US.utf8"}
        )
        self._chk_remove_user(user=user3,
                              host='%',
                              connection_user=self.user,
                              connection_pass=self.password,
        )
        self._chk_remove_user(user=user3,
                              host='localhost',
                              connection_user=self.user,
                              connection_pass=self.password,
        )
        self._chk_remove_user(user=user4,
                              host='%',
                              connection_user=self.user,
                              connection_pass=self.password,
                              connection_charset='utf8',
                              saltenv={"LC_ALL": "en_US.utf8"}
        )
        self._chk_remove_user(user=user6,
                              host='10.0.0.1',
                              connection_user=self.user,
                              connection_pass=self.password,
        )
        self._chk_remove_user(user=user1,
                              host='localhost',
                              connection_user=self.user,
                              connection_pass=self.password,
        )
        self._chk_remove_user(user=user5,
                              host='localhost',
                              connection_user=self.user,
                              connection_pass=self.password,
        )
        # Final verification of the cleanup
        ret = self.run_function(
            'mysql.user_list',
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )
        self.assertNotIn({'Host': 'localhost', 'User': user1}, ret)
        self.assertNotIn({'Host': 'localhost', 'User': user2}, ret)
        self.assertNotIn({'Host': '10.0.0.1', 'User': user2}, ret)
        self.assertNotIn({'Host': '10.0.0.2', 'User': user2}, ret)
        self.assertNotIn({'Host': '%', 'User': user3}, ret)
        self.assertNotIn({'Host': 'localhost', 'User': user3}, ret)
        self.assertNotIn({'Host': '%', 'User': user4_utf8}, ret)
        self.assertNotIn({'Host': 'localhost', 'User': user5_utf8}, ret)
        self.assertNotIn({'Host': '10.0.0.1', 'User': user6_utf8}, ret)


@skipIf(
    NO_MYSQL,
    'Please install MySQL bindings and a MySQL Server before running'
    'MySQL integration tests.'
)
class MysqlModuleUserGrantTest(integration.ModuleCase,
                      integration.SaltReturnAssertsMixIn):
    '''
    User Creation and connection tests
    '''

    user = 'root'
    password = 'poney'
    # yep, theses are valid MySQL db names
    # very special chars are _ % and .
    testdb1 = 'tes.t\'"saltdb'
    testdb2 = 't_st `(:=salt%b)'
    testdb3 = 'test `(:=salteeb)'
    table1 = 'foo'
    table2 = "foo `\'%_bar"
    users = {
        'user1': {
            'name': 'foo',
            'pwd': 'bar',
        },
        'user2': {
            'name': 'user ";--,?:&/\\',
            'pwd': '";--(),?:@=&/\\',
        },
        # this is : passwd 標標
        'user3': {
            'name': 'user( @ )=foobar',
            'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
        },
        # this is : user/password containing 標標
        'user4': {
            'name': 'user \xe6\xa8\x99',
            'pwd': '\xe6\xa8\x99\xe6\xa8\x99',
        },
    }

    @destructiveTest
    def setUp(self):
        '''
        Test presence of MySQL server, enforce a root password, create users
        '''
        super(MysqlModuleUserGrantTest, self).setUp()
        NO_MYSQL_SERVER = True
        # now ensure we know the mysql root password
        # one of theses two at least should work
        ret1 = self.run_state(
            'cmd.run',
             name='mysqladmin --host="localhost" -u '
               + self.user
               + ' flush-privileges password "'
               + self.password
               + '"'
        )
        ret2 = self.run_state(
            'cmd.run',
             name='mysqladmin --host="localhost" -u '
               + self.user
               + ' --password="'
               + self.password
               + '" flush-privileges password "'
               + self.password
               + '"'
        )
        key, value = ret2.popitem()
        if value['result']:
            NO_MYSQL_SERVER = False
        else:
            self.skipTest('No MySQL Server running, or no root access on it.')
        # Create some users and a test db
        for user, userdef in self.users.iteritems():
            self._userCreation(uname=userdef['name'], password=userdef['pwd'])
        self.run_function(
            'mysql.db_create',
            name=self.testdb1,
            connection_user=self.user,
            connection_pass=self.password,
        )
        self.run_function(
            'mysql.db_create',
            name=self.testdb2,
            connection_user=self.user,
            connection_pass=self.password,
        )
        create_query = ('CREATE TABLE {tblname} ('
            ' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
            ' data VARCHAR(100)) ENGINE={engine};'.format(
            tblname=mysqlmod.quote_identifier(self.table1),
            engine='MYISAM',
        ))
        log.info('Adding table {0!r}'.format(self.table1,))
        self.run_function(
            'mysql.query',
            database=self.testdb2,
            query=create_query,
            connection_user=self.user,
            connection_pass=self.password
        )
        create_query = ('CREATE TABLE {tblname} ('
            ' id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,'
            ' data VARCHAR(100)) ENGINE={engine};'.format(
            tblname=mysqlmod.quote_identifier(self.table2),
            engine='MYISAM',
        ))
        log.info('Adding table {0!r}'.format(self.table2,))
        self.run_function(
            'mysql.query',
            database=self.testdb2,
            query=create_query,
            connection_user=self.user,
            connection_pass=self.password
        )

    @destructiveTest
    def tearDown(self):
        '''
        Removes created users and db
        '''
        for user, userdef in self.users.iteritems():
            self._userRemoval(uname=userdef['name'], password=userdef['pwd'])
        self.run_function(
            'mysql.db_remove',
            name=self.testdb1,
            connection_user=self.user,
            connection_pass=self.password,
       )
        self.run_function(
            'mysql.db_remove',
            name=self.testdb2,
            connection_user=self.user,
            connection_pass=self.password,
       )

    def _userCreation(self,
                      uname,
                      password=None):
        '''
        Create a test user
        '''
        self.run_function(
            'mysql.user_create',
            user=uname,
            host='localhost',
            password=password,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )

    def _userRemoval(self,
                     uname,
                     password=None):
        '''
        Removes a test user
        '''
        self.run_function(
            'mysql.user_remove',
            user=uname,
            host='localhost',
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8',
            saltenv={"LC_ALL": "en_US.utf8"}
        )

    def _addGrantRoutine(self,
                         grant,
                         user,
                         db,
                         grant_option=False,
                         escape=True,
                         **kwargs):
        '''
        Perform some tests around creation of the given grants
        '''
        ret = self.run_function(
            'mysql.grant_add',
            grant=grant,
            database=db,
            user=user,
            grant_option=grant_option,
            escape=escape,
            **kwargs
        )
        self.assertEqual(True, ret, ('Calling grant_add on'
            ' user {0!r} and grants {1!r} did not return True: {2}').format(
            user,
            grant,
            repr(ret)
        ))
        ret = self.run_function(
            'mysql.grant_exists',
            grant=grant,
            database=db,
            user=user,
            grant_option=grant_option,
            escape=escape,
            **kwargs
        )
        self.assertEqual(True, ret, ('Calling grant_exists on'
            ' user {0!r} and grants {1!r} did not return True: {2}').format(
            user,
            grant,
            repr(ret)
        ))

    @destructiveTest
    def testGrants(self):
        '''
        Test user grant methods
        '''
        self._addGrantRoutine(
            grant='SELECT, INSERT,UPDATE, CREATE',
            user=self.users['user1']['name'],
            db=self.testdb1 + '.*',
            grant_option=True,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password
        )
        self._addGrantRoutine(
            grant='INSERT, SELECT',
            user=self.users['user1']['name'],
            db=self.testdb2 + '.' + self.table1,
            grant_option=True,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password
        )
        self._addGrantRoutine(
            grant='  SELECT, UPDATE,DELETE, CREATE TEMPORARY TABLES',
            user=self.users['user2']['name'],
            db=self.testdb1 + '.*',
            grant_option=True,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password
        )
        self._addGrantRoutine(
            grant='select, ALTER,CREATE TEMPORARY TABLES, EXECUTE ',
            user=self.users['user3']['name'],
            db=self.testdb1 + '.*',
            grant_option=True,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password
        )
        self._addGrantRoutine(
            grant='SELECT, INSERT',
            user=self.users['user4']['name'],
            db=self.testdb2 + '.' + self.table2,
            grant_option=False,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8'
        )
        self._addGrantRoutine(
            grant='CREATE',
            user=self.users['user4']['name'],
            db=self.testdb2 + '.*',
            grant_option=False,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8'
        )
        self._addGrantRoutine(
            grant='SELECT, INSERT',
            user=self.users['user4']['name'],
            db=self.testdb2 + '.' + self.table1,
            grant_option=False,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8'
        )
        # '' is valid for anonymous users
        self._addGrantRoutine(
            grant='DELETE',
            user='',
            db=self.testdb3 + '.*',
            grant_option=False,
            escape=True,
            connection_user=self.user,
            connection_pass=self.password
        )
        # Check result for users
        ret = self.run_function(
            'mysql.user_grants',
            user=self.users['user1']['name'],
            host='localhost',
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(ret, [
            "GRANT USAGE ON *.* TO 'foo'@'localhost'",
            ('GRANT SELECT, INSERT, UPDATE, CREATE ON '
             '`tes.t\'"saltdb`.* TO \'foo\'@\'localhost\' WITH GRANT OPTION'),
            ("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo`"
             " TO 'foo'@'localhost' WITH GRANT OPTION")
        ])

        ret = self.run_function(
            'mysql.user_grants',
            user=self.users['user2']['name'],
            host='localhost',
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(ret, [
            'GRANT USAGE ON *.* TO \'user ";--,?:&/\\\'@\'localhost\'',
            ('GRANT SELECT, UPDATE, DELETE, CREATE TEMPORARY TABLES ON `tes.t\''
             '"saltdb`.* TO \'user ";--,?:&/\\\'@\'localhost\''
             ' WITH GRANT OPTION')
        ])

        ret = self.run_function(
            'mysql.user_grants',
            user=self.users['user3']['name'],
            host='localhost',
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(ret, [
            "GRANT USAGE ON *.* TO 'user( @ )=foobar'@'localhost'",
            ('GRANT SELECT, ALTER, CREATE TEMPORARY TABLES, EXECUTE ON '
             '`tes.t\'"saltdb`.* TO \'user( @ )=foobar\'@\'localhost\' '
             'WITH GRANT OPTION')
        ])

        ret = self.run_function(
            'mysql.user_grants',
            user=self.users['user4']['name'],
            host='localhost',
            connection_user=self.user,
            connection_pass=self.password,
            connection_charset='utf8'
        )
        self.assertEqual(ret, [
            "GRANT USAGE ON *.* TO 'user \xe6\xa8\x99'@'localhost'",
            (r"GRANT CREATE ON `t\_st ``(:=salt\%b)`.* TO "
             "'user \xe6\xa8\x99'@'localhost'"),
            ("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo ``'%_bar` TO "
             "'user \xe6\xa8\x99'@'localhost'"),
            ("GRANT SELECT, INSERT ON `t_st ``(:=salt%b)`.`foo` TO "
             "'user \xe6\xa8\x99'@'localhost'"),
        ])

        ret = self.run_function(
            'mysql.user_grants',
            user='',
            host='localhost',
            connection_user=self.user,
            connection_pass=self.password
        )
        self.assertEqual(ret, [
            "GRANT USAGE ON *.* TO ''@'localhost'",
            "GRANT DELETE ON `test ``(:=salteeb)`.* TO ''@'localhost'"
        ])

if __name__ == '__main__':
    from integration import run_tests
    run_tests(MysqlModuleDbTest, MysqlModuleUserTest)
